#include <test/jtx.h>
#include <test/jtx/Env.h>

#include <xrpld/overlay/Message.h>
#include <xrpld/overlay/Peer.h>
#include <xrpld/overlay/Slot.h>
#include <xrpld/overlay/Squelch.h>
#include <xrpld/overlay/detail/Handshake.h>

#include <xrpl/basics/random.h>
#include <xrpl/beast/unit_test.h>
#include <xrpl/protocol/SecretKey.h>
#include <xrpl/protocol/messages.h>

#include <boost/thread.hpp>

#include <chrono>
#include <iostream>
#include <numeric>
#include <optional>

namespace ripple {

namespace test {

using namespace std::chrono;

class Link;

using MessageSPtr = std::shared_ptr<Message>;
using LinkSPtr = std::shared_ptr<Link>;
using PeerSPtr = std::shared_ptr<Peer>;
using PeerWPtr = std::weak_ptr<Peer>;
using SquelchCB =
    std::function<void(PublicKey const&, PeerWPtr const&, std::uint32_t)>;
using UnsquelchCB = std::function<void(PublicKey const&, PeerWPtr const&)>;
using LinkIterCB = std::function<void(Link&, MessageSPtr)>;

static constexpr std::uint32_t MAX_PEERS = 10;
static constexpr std::uint32_t MAX_VALIDATORS = 10;
static constexpr std::uint32_t MAX_MESSAGES = 200000;

/** Simulate two entities - peer directly connected to the server
 * (via squelch in PeerSim) and PeerImp (via Overlay)
 */
class PeerPartial : public Peer
{
public:
    PeerPartial()
        : nodePublicKey_(derivePublicKey(KeyType::ed25519, randomSecretKey()))
    {
    }

    PublicKey nodePublicKey_;
    virtual ~PeerPartial()
    {
    }
    virtual void
    onMessage(MessageSPtr const& m, SquelchCB f) = 0;
    virtual void
    onMessage(protocol::TMSquelch const& squelch) = 0;
    void
    send(protocol::TMSquelch const& squelch)
    {
        onMessage(squelch);
    }

    // dummy implementation
    void
    send(std::shared_ptr<Message> const& m) override
    {
    }
    beast::IP::Endpoint
    getRemoteAddress() const override
    {
        return {};
    }
    void
    charge(Resource::Charge const& fee, std::string const& context = {})
        override
    {
    }
    bool
    cluster() const override
    {
        return false;
    }
    bool
    isHighLatency() const override
    {
        return false;
    }
    int
    getScore(bool) const override
    {
        return 0;
    }
    PublicKey const&
    getNodePublic() const override
    {
        return nodePublicKey_;
    }
    Json::Value
    json() override
    {
        return {};
    }
    bool
    supportsFeature(ProtocolFeature f) const override
    {
        return false;
    }
    std::optional<std::size_t>
    publisherListSequence(PublicKey const&) const override
    {
        return {};
    }
    void
    setPublisherListSequence(PublicKey const&, std::size_t const) override
    {
    }
    uint256 const&
    getClosedLedgerHash() const override
    {
        static uint256 hash{};
        return hash;
    }
    bool
    hasLedger(uint256 const& hash, std::uint32_t seq) const override
    {
        return false;
    }
    void
    ledgerRange(std::uint32_t& minSeq, std::uint32_t& maxSeq) const override
    {
    }
    bool
    hasTxSet(uint256 const& hash) const override
    {
        return false;
    }
    void
    cycleStatus() override
    {
    }
    bool
    hasRange(std::uint32_t uMin, std::uint32_t uMax) override
    {
        return false;
    }
    bool
    compressionEnabled() const override
    {
        return false;
    }
    bool
    txReduceRelayEnabled() const override
    {
        return false;
    }
    void
    sendTxQueue() override
    {
    }
    void
    addTxQueue(uint256 const&) override
    {
    }
    void
    removeTxQueue(uint256 const&) override
    {
    }
};

/** Manually advanced clock. */
class ManualClock
{
public:
    typedef uint64_t rep;
    typedef std::milli period;
    typedef std::chrono::duration<std::uint32_t, period> duration;
    typedef std::chrono::time_point<ManualClock> time_point;
    inline static bool const is_steady = false;

    static void
    advance(duration d) noexcept
    {
        now_ += d;
    }

    static void
    randAdvance(milliseconds min, milliseconds max)
    {
        now_ += randDuration(min, max);
    }

    static void
    reset() noexcept
    {
        now_ = time_point(seconds(0));
    }

    static time_point
    now() noexcept
    {
        return now_;
    }

    static duration
    randDuration(milliseconds min, milliseconds max)
    {
        return duration(milliseconds(rand_int(min.count(), max.count())));
    }

    explicit ManualClock() = default;

private:
    inline static time_point now_ = time_point(seconds(0));
};

/** Simulate server's OverlayImpl */
class Overlay
{
public:
    Overlay() = default;
    virtual ~Overlay() = default;

    virtual void
    updateSlotAndSquelch(
        uint256 const& key,
        PublicKey const& validator,
        Peer::id_t id,
        SquelchCB f,
        protocol::MessageType type = protocol::mtVALIDATION) = 0;

    virtual void deleteIdlePeers(UnsquelchCB) = 0;

    virtual void deletePeer(Peer::id_t, UnsquelchCB) = 0;
};

class Validator;

/** Simulate link from a validator to a peer directly connected
 * to the server.
 */
class Link
{
    using Latency = std::pair<milliseconds, milliseconds>;

public:
    Link(
        Validator& validator,
        PeerSPtr peer,
        Latency const& latency = {milliseconds(5), milliseconds(15)})
        : validator_(validator), peer_(peer), latency_(latency), up_(true)
    {
        auto sp = peer_.lock();
        assert(sp);
    }
    ~Link() = default;
    void
    send(MessageSPtr const& m, SquelchCB f)
    {
        if (!up_)
            return;
        auto sp = peer_.lock();
        assert(sp);
        auto peer = std::dynamic_pointer_cast<PeerPartial>(sp);
        peer->onMessage(m, f);
    }
    Validator&
    validator()
    {
        return validator_;
    }
    void
    up(bool linkUp)
    {
        up_ = linkUp;
    }
    Peer::id_t
    peerId()
    {
        auto p = peer_.lock();
        assert(p);
        return p->id();
    }
    PeerSPtr
    getPeer()
    {
        auto p = peer_.lock();
        assert(p);
        return p;
    }

private:
    Validator& validator_;
    PeerWPtr peer_;
    Latency latency_;
    bool up_;
};

/** Simulate Validator */
class Validator
{
    using Links = std::unordered_map<Peer::id_t, LinkSPtr>;

public:
    Validator() : pkey_(std::get<0>(randomKeyPair(KeyType::ed25519)))
    {
        protocol::TMValidation v;
        v.set_validation("validation");
        message_ = std::make_shared<Message>(v, protocol::mtVALIDATION, pkey_);
        id_ = sid_++;
    }
    Validator(Validator const&) = default;
    Validator(Validator&&) = default;
    Validator&
    operator=(Validator const&) = default;
    Validator&
    operator=(Validator&&) = default;
    ~Validator()
    {
        clear();
    }

    void
    clear()
    {
        links_.clear();
    }

    static void
    resetId()
    {
        sid_ = 0;
    }

    PublicKey const&
    key()
    {
        return pkey_;
    }

    operator PublicKey() const
    {
        return pkey_;
    }

    void
    addPeer(PeerSPtr peer)
    {
        links_.emplace(
            std::make_pair(peer->id(), std::make_shared<Link>(*this, peer)));
    }

    void
    deletePeer(Peer::id_t id)
    {
        links_.erase(id);
    }

    void
    for_links(std::vector<Peer::id_t> peers, LinkIterCB f)
    {
        for (auto id : peers)
        {
            assert(links_.find(id) != links_.end());
            f(*links_[id], message_);
        }
    }

    void
    for_links(LinkIterCB f, bool simulateSlow = false)
    {
        std::vector<LinkSPtr> v;
        std::transform(
            links_.begin(), links_.end(), std::back_inserter(v), [](auto& kv) {
                return kv.second;
            });
        std::random_device d;
        std::mt19937 g(d());
        std::shuffle(v.begin(), v.end(), g);

        for (auto& link : v)
        {
            f(*link, message_);
        }
    }

    /** Send to specific peers */
    void
    send(std::vector<Peer::id_t> peers, SquelchCB f)
    {
        for_links(peers, [&](Link& link, MessageSPtr m) { link.send(m, f); });
    }

    /** Send to all peers */
    void
    send(SquelchCB f)
    {
        for_links([&](Link& link, MessageSPtr m) { link.send(m, f); });
    }

    MessageSPtr
    message()
    {
        return message_;
    }

    std::uint16_t
    id()
    {
        return id_;
    }

    void
    linkUp(Peer::id_t id)
    {
        auto it = links_.find(id);
        assert(it != links_.end());
        it->second->up(true);
    }

    void
    linkDown(Peer::id_t id)
    {
        auto it = links_.find(id);
        assert(it != links_.end());
        it->second->up(false);
    }

private:
    Links links_;
    PublicKey pkey_;
    MessageSPtr message_ = nullptr;
    inline static std::uint16_t sid_ = 0;
    std::uint16_t id_ = 0;
};

class PeerSim : public PeerPartial, public std::enable_shared_from_this<PeerSim>
{
public:
    using id_t = Peer::id_t;
    PeerSim(Overlay& overlay, beast::Journal journal)
        : overlay_(overlay), squelch_(journal)
    {
        id_ = sid_++;
    }

    ~PeerSim() = default;

    id_t
    id() const override
    {
        return id_;
    }

    std::string const&
    fingerprint() const override
    {
        return fingerprint_;
    }

    static void
    resetId()
    {
        sid_ = 0;
    }

    /** Local Peer (PeerImp) */
    void
    onMessage(MessageSPtr const& m, SquelchCB f) override
    {
        auto validator = m->getValidatorKey();
        assert(validator);
        if (!squelch_.expireSquelch(*validator))
            return;

        overlay_.updateSlotAndSquelch({}, *validator, id(), f);
    }

    /** Remote Peer (Directly connected Peer) */
    virtual void
    onMessage(protocol::TMSquelch const& squelch) override
    {
        auto validator = squelch.validatorpubkey();
        PublicKey key(Slice(validator.data(), validator.size()));
        if (squelch.squelch())
            squelch_.addSquelch(
                key, std::chrono::seconds{squelch.squelchduration()});
        else
            squelch_.removeSquelch(key);
    }

private:
    inline static id_t sid_ = 0;
    std::string fingerprint_;
    id_t id_;
    Overlay& overlay_;
    reduce_relay::Squelch<ManualClock> squelch_;
};

class OverlaySim : public Overlay, public reduce_relay::SquelchHandler
{
    using Peers = std::unordered_map<Peer::id_t, PeerSPtr>;

public:
    using id_t = Peer::id_t;
    using clock_type = ManualClock;
    OverlaySim(Application& app)
        : slots_(app.logs(), *this, app.config()), logs_(app.logs())
    {
    }

    ~OverlaySim() = default;

    void
    clear()
    {
        peers_.clear();
        ManualClock::advance(hours(1));
        slots_.deleteIdlePeers();
    }

    std::uint16_t
    inState(PublicKey const& validator, reduce_relay::PeerState state)
    {
        auto res = slots_.inState(validator, state);
        return res ? *res : 0;
    }

    void
    updateSlotAndSquelch(
        uint256 const& key,
        PublicKey const& validator,
        Peer::id_t id,
        SquelchCB f,
        protocol::MessageType type = protocol::mtVALIDATION) override
    {
        squelch_ = f;
        slots_.updateSlotAndSquelch(key, validator, id, type);
    }

    void
    deletePeer(id_t id, UnsquelchCB f) override
    {
        unsquelch_ = f;
        slots_.deletePeer(id, true);
    }

    void
    deleteIdlePeers(UnsquelchCB f) override
    {
        unsquelch_ = f;
        slots_.deleteIdlePeers();
    }

    PeerSPtr
    addPeer(bool useCache = true)
    {
        PeerSPtr peer{};
        Peer::id_t id;
        if (peersCache_.empty() || !useCache)
        {
            peer = std::make_shared<PeerSim>(*this, logs_.journal("Squelch"));
            id = peer->id();
        }
        else
        {
            auto it = peersCache_.begin();
            peer = it->second;
            id = it->first;
            peersCache_.erase(it);
        }
        peers_.emplace(std::make_pair(id, peer));
        return peer;
    }

    void
    deletePeer(Peer::id_t id, bool useCache = true)
    {
        auto it = peers_.find(id);
        assert(it != peers_.end());
        deletePeer(id, [&](PublicKey const&, PeerWPtr) {});
        if (useCache)
            peersCache_.emplace(std::make_pair(id, it->second));
        peers_.erase(it);
    }

    void
    resetPeers()
    {
        while (!peers_.empty())
            deletePeer(peers_.begin()->first);
        while (!peersCache_.empty())
            addPeer();
    }

    std::optional<Peer::id_t>
    deleteLastPeer()
    {
        if (peers_.empty())
            return {};

        std::uint8_t maxId = 0;

        for (auto& [id, _] : peers_)
        {
            (void)_;
            if (id > maxId)
                maxId = id;
        }

        deletePeer(maxId, false);

        return maxId;
    }

    bool
    isCountingState(PublicKey const& validator)
    {
        return slots_.inState(validator, reduce_relay::SlotState::Counting);
    }

    std::set<id_t>
    getSelected(PublicKey const& validator)
    {
        return slots_.getSelected(validator);
    }

    bool
    isSelected(PublicKey const& validator, Peer::id_t peer)
    {
        auto selected = slots_.getSelected(validator);
        return selected.find(peer) != selected.end();
    }

    id_t
    getSelectedPeer(PublicKey const& validator)
    {
        auto selected = slots_.getSelected(validator);
        assert(selected.size());
        return *selected.begin();
    }

    std::unordered_map<
        id_t,
        std::tuple<
            reduce_relay::PeerState,
            std::uint16_t,
            std::uint32_t,
            std::uint32_t>>
    getPeers(PublicKey const& validator)
    {
        return slots_.getPeers(validator);
    }

    std::uint16_t
    getNumPeers() const
    {
        return peers_.size();
    }

private:
    void
    squelch(
        PublicKey const& validator,
        Peer::id_t id,
        std::uint32_t squelchDuration) const override
    {
        if (auto it = peers_.find(id); it != peers_.end())
            squelch_(validator, it->second, squelchDuration);
    }
    void
    unsquelch(PublicKey const& validator, Peer::id_t id) const override
    {
        if (auto it = peers_.find(id); it != peers_.end())
            unsquelch_(validator, it->second);
    }
    SquelchCB squelch_;
    UnsquelchCB unsquelch_;
    Peers peers_;
    Peers peersCache_;
    reduce_relay::Slots<ManualClock> slots_;
    Logs& logs_;
};

class Network
{
public:
    Network(Application& app) : overlay_(app)
    {
        init();
    }

    void
    init()
    {
        validators_.resize(MAX_VALIDATORS);
        for (int p = 0; p < MAX_PEERS; p++)
        {
            auto peer = overlay_.addPeer();
            for (auto& v : validators_)
                v.addPeer(peer);
        }
    }

    ~Network() = default;

    void
    reset()
    {
        validators_.clear();
        overlay_.clear();
        PeerSim::resetId();
        Validator::resetId();
        init();
    }

    Peer::id_t
    addPeer()
    {
        auto peer = overlay_.addPeer();
        for (auto& v : validators_)
            v.addPeer(peer);
        return peer->id();
    }

    void
    deleteLastPeer()
    {
        auto id = overlay_.deleteLastPeer();

        if (!id)
            return;

        for (auto& validator : validators_)
            validator.deletePeer(*id);
    }

    void
    purgePeers()
    {
        while (overlay_.getNumPeers() > MAX_PEERS)
            deleteLastPeer();
    }

    Validator&
    validator(std::uint16_t v)
    {
        assert(v < validators_.size());
        return validators_[v];
    }

    OverlaySim&
    overlay()
    {
        return overlay_;
    }

    void
    enableLink(std::uint16_t validatorId, Peer::id_t peer, bool enable)
    {
        auto it =
            std::find_if(validators_.begin(), validators_.end(), [&](auto& v) {
                return v.id() == validatorId;
            });
        assert(it != validators_.end());
        if (enable)
            it->linkUp(peer);
        else
            it->linkDown(peer);
    }

    void
    onDisconnectPeer(Peer::id_t peer)
    {
        // Send unsquelch to the Peer on all links. This way when
        // the Peer "reconnects" it starts sending messages on the link.
        // We expect that if a Peer disconnects and then reconnects, it's
        // unsquelched.
        protocol::TMSquelch squelch;
        squelch.set_squelch(false);
        for (auto& v : validators_)
        {
            PublicKey key = v;
            squelch.clear_validatorpubkey();
            squelch.set_validatorpubkey(key.data(), key.size());
            v.for_links({peer}, [&](Link& l, MessageSPtr) {
                std::dynamic_pointer_cast<PeerSim>(l.getPeer())->send(squelch);
            });
        }
    }

    void
    for_rand(
        std::uint32_t min,
        std::uint32_t max,
        std::function<void(std::uint32_t)> f)
    {
        auto size = max - min;
        std::vector<std::uint32_t> s(size);
        std::iota(s.begin(), s.end(), min);
        std::random_device d;
        std::mt19937 g(d());
        std::shuffle(s.begin(), s.end(), g);
        for (auto v : s)
            f(v);
    }

    void
    propagate(
        LinkIterCB link,
        std::uint16_t nValidators = MAX_VALIDATORS,
        std::uint32_t nMessages = MAX_MESSAGES,
        bool purge = true,
        bool resetClock = true)
    {
        if (resetClock)
            ManualClock::reset();

        if (purge)
        {
            purgePeers();
            overlay_.resetPeers();
        }

        for (int m = 0; m < nMessages; ++m)
        {
            ManualClock::randAdvance(milliseconds(1800), milliseconds(2200));
            for_rand(0, nValidators, [&](std::uint32_t v) {
                validators_[v].for_links(link);
            });
        }
    }

    /** Is peer in Selected state in any of the slots */
    bool
    isSelected(Peer::id_t id)
    {
        for (auto& v : validators_)
        {
            if (overlay_.isSelected(v, id))
                return true;
        }
        return false;
    }

    /** Check if there are peers to unsquelch - peer is in Selected
     * state in any of the slots and there are peers in Squelched state
     * in those slots.
     */
    bool
    allCounting(Peer::id_t peer)
    {
        for (auto& v : validators_)
        {
            if (!overlay_.isSelected(v, peer))
                continue;
            auto peers = overlay_.getPeers(v);
            for (auto& [_, v] : peers)
            {
                (void)_;
                if (std::get<reduce_relay::PeerState>(v) ==
                    reduce_relay::PeerState::Squelched)
                    return false;
            }
        }
        return true;
    }

private:
    OverlaySim overlay_;
    std::vector<Validator> validators_;
};

class reduce_relay_test : public beast::unit_test::suite
{
    using Slot = reduce_relay::Slot<ManualClock>;
    using id_t = Peer::id_t;

protected:
    void
    printPeers(std::string const& msg, std::uint16_t validator = 0)
    {
        auto peers = network_.overlay().getPeers(network_.validator(validator));
        std::cout << msg << " "
                  << "num peers " << (int)network_.overlay().getNumPeers()
                  << std::endl;
        for (auto& [k, v] : peers)
            std::cout << k << ":" << (int)std::get<reduce_relay::PeerState>(v)
                      << " ";
        std::cout << std::endl;
    }

    /** Send squelch (if duration is set) or unsquelch (if duration not set) */
    Peer::id_t
    sendSquelch(
        PublicKey const& validator,
        PeerWPtr const& peerPtr,
        std::optional<std::uint32_t> duration)
    {
        protocol::TMSquelch squelch;
        bool res = duration ? true : false;
        squelch.set_squelch(res);
        squelch.set_validatorpubkey(validator.data(), validator.size());
        if (res)
            squelch.set_squelchduration(*duration);
        auto sp = peerPtr.lock();
        assert(sp);
        std::dynamic_pointer_cast<PeerSim>(sp)->send(squelch);
        return sp->id();
    }

    enum State { On, Off, WaitReset };
    enum EventType { LinkDown = 0, PeerDisconnected = 1 };
    // Link down or Peer disconnect event
    // TBD - add new peer event
    // TBD - add overlapping type of events at any
    //       time in any quantity
    struct Event
    {
        State state_ = State::Off;
        std::uint32_t cnt_ = 0;
        std::uint32_t handledCnt_ = 0;
        bool isSelected_ = false;
        Peer::id_t peer_;
        std::uint16_t validator_;
        std::optional<PublicKey> key_;
        time_point<ManualClock> time_;
        bool handled_ = false;
    };

    /** Randomly brings the link between a validator and a peer down.
     * Randomly disconnects a peer. Those events are generated one at a time.
     */
    void
    random(bool log)
    {
        std::unordered_map<EventType, Event> events{
            {LinkDown, {}}, {PeerDisconnected, {}}};
        time_point<ManualClock> lastCheck = ManualClock::now();

        network_.reset();
        network_.propagate([&](Link& link, MessageSPtr m) {
            auto& validator = link.validator();
            auto now = ManualClock::now();

            bool squelched = false;
            std::stringstream str;

            link.send(
                m,
                [&](PublicKey const& key,
                    PeerWPtr const& peerPtr,
                    std::uint32_t duration) {
                    assert(key == validator);
                    auto p = sendSquelch(key, peerPtr, duration);
                    squelched = true;
                    str << p << " ";
                });

            if (squelched)
            {
                auto selected = network_.overlay().getSelected(validator);
                str << " selected: ";
                for (auto s : selected)
                    str << s << " ";
                if (log)
                    std::cout
                        << (double)reduce_relay::epoch<milliseconds>(now)
                                .count() /
                            1000.
                        << " random, squelched, validator: " << validator.id()
                        << " peers: " << str.str() << std::endl;
                auto countingState =
                    network_.overlay().isCountingState(validator);
                BEAST_EXPECT(
                    countingState == false &&
                    selected.size() ==
                        env_.app()
                            .config()
                            .VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS);
            }

            // Trigger Link Down or Peer Disconnect event
            // Only one Link Down at a time
            if (events[EventType::LinkDown].state_ == State::Off)
            {
                auto update = [&](EventType event) {
                    events[event].cnt_++;
                    events[event].validator_ = validator.id();
                    events[event].key_ = validator;
                    events[event].peer_ = link.peerId();
                    events[event].state_ = State::On;
                    events[event].time_ = now;
                    if (event == EventType::LinkDown)
                    {
                        network_.enableLink(
                            validator.id(), link.peerId(), false);
                        events[event].isSelected_ =
                            network_.overlay().isSelected(
                                validator, link.peerId());
                    }
                    else
                        events[event].isSelected_ =
                            network_.isSelected(link.peerId());
                };
                auto r = rand_int(0, 1000);
                if (r == (int)EventType::LinkDown ||
                    r == (int)EventType::PeerDisconnected)
                {
                    update(static_cast<EventType>(r));
                }
            }

            if (events[EventType::PeerDisconnected].state_ == State::On)
            {
                auto& event = events[EventType::PeerDisconnected];
                bool allCounting = network_.allCounting(event.peer_);
                network_.overlay().deletePeer(
                    event.peer_,
                    [&](PublicKey const& v, PeerWPtr const& peerPtr) {
                        if (event.isSelected_)
                            sendSquelch(v, peerPtr, {});
                        event.handled_ = true;
                    });
                // Should only be unsquelched if the peer is in Selected state
                // If in Selected state it's possible unsquelching didn't
                // take place because there is no peers in Squelched state in
                // any of the slots where the peer is in Selected state
                // (allCounting is true)
                bool handled =
                    (event.isSelected_ == false && !event.handled_) ||
                    (event.isSelected_ == true &&
                     (event.handled_ || allCounting));
                BEAST_EXPECT(handled);
                event.state_ = State::Off;
                event.isSelected_ = false;
                event.handledCnt_ += handled;
                event.handled_ = false;
                network_.onDisconnectPeer(event.peer_);
            }

            auto& event = events[EventType::LinkDown];
            // Check every sec for idled peers. Idled peers are
            // created by Link Down event.
            if (now - lastCheck > milliseconds(1000))
            {
                lastCheck = now;
                // Check if Link Down event must be handled by
                // deleteIdlePeer(): 1) the peer is in Selected state;
                // 2) the peer has not received any messages for IDLED time;
                // 3) there are peers in Squelched state in the slot.
                // 4) peer is in Slot's peers_ (if not then it is deleted
                //    by Slots::deleteIdlePeers())
                bool mustHandle = false;
                if (event.state_ == State::On && BEAST_EXPECT(event.key_))
                {
                    event.isSelected_ =
                        network_.overlay().isSelected(*event.key_, event.peer_);
                    auto peers = network_.overlay().getPeers(*event.key_);
                    auto d = reduce_relay::epoch<milliseconds>(now).count() -
                        std::get<3>(peers[event.peer_]);
                    mustHandle = event.isSelected_ &&
                        d > milliseconds(reduce_relay::IDLED).count() &&
                        network_.overlay().inState(
                            *event.key_, reduce_relay::PeerState::Squelched) >
                            0 &&
                        peers.find(event.peer_) != peers.end();
                }
                network_.overlay().deleteIdlePeers(
                    [&](PublicKey const& v, PeerWPtr const& ptr) {
                        event.handled_ = true;
                        if (mustHandle && v == event.key_)
                        {
                            event.state_ = State::WaitReset;
                            sendSquelch(validator, ptr, {});
                        }
                    });
                bool handled =
                    (event.handled_ && event.state_ == State::WaitReset) ||
                    (!event.handled_ && !mustHandle);
                BEAST_EXPECT(handled);
            }
            if (event.state_ == State::WaitReset ||
                (event.state_ == State::On &&
                 (now - event.time_ > (reduce_relay::IDLED + seconds(2)))))
            {
                bool handled =
                    event.state_ == State::WaitReset || !event.handled_;
                BEAST_EXPECT(handled);
                event.state_ = State::Off;
                event.isSelected_ = false;
                event.handledCnt_ += handled;
                event.handled_ = false;
                network_.enableLink(event.validator_, event.peer_, true);
            }
        });

        auto& down = events[EventType::LinkDown];
        auto& disconnected = events[EventType::PeerDisconnected];
        // It's possible the last Down Link event is not handled
        BEAST_EXPECT(down.handledCnt_ >= down.cnt_ - 1);
        // All Peer Disconnect events must be handled
        BEAST_EXPECT(disconnected.cnt_ == disconnected.handledCnt_);
        if (log)
            std::cout << "link down count: " << down.cnt_ << "/"
                      << down.handledCnt_
                      << " peer disconnect count: " << disconnected.cnt_ << "/"
                      << disconnected.handledCnt_;
    }

    bool
    checkCounting(PublicKey const& validator, bool isCountingState)
    {
        auto countingState = network_.overlay().isCountingState(validator);
        BEAST_EXPECT(countingState == isCountingState);
        return countingState == isCountingState;
    }

    void
    doTest(std::string const& msg, bool log, std::function<void(bool)> f)
    {
        testcase(msg);
        f(log);
    }

    /** Initial counting round: three peers receive message "faster" then
     * others. Once the message count for the three peers reaches threshold
     * the rest of the peers are squelched and the slot for the given validator
     * is in Selected state.
     */
    void
    testInitialRound(bool log)
    {
        doTest("Initial Round", log, [this](bool log) {
            BEAST_EXPECT(propagateAndSquelch(log));
        });
    }

    /** Receiving message from squelched peer too soon should not change the
     * slot's state to Counting.
     */
    void
    testPeerUnsquelchedTooSoon(bool log)
    {
        doTest("Peer Unsquelched Too Soon", log, [this](bool log) {
            BEAST_EXPECT(propagateNoSquelch(log, 1, false, false, false));
        });
    }

    /** Receiving message from squelched peer should change the
     * slot's state to Counting.
     */
    void
    testPeerUnsquelched(bool log)
    {
        ManualClock::advance(seconds(601));
        doTest("Peer Unsquelched", log, [this](bool log) {
            BEAST_EXPECT(propagateNoSquelch(log, 2, true, true, false));
        });
    }

    /** Propagate enough messages to generate one squelch event */
    bool
    propagateAndSquelch(bool log, bool purge = true, bool resetClock = true)
    {
        int n = 0;
        network_.propagate(
            [&](Link& link, MessageSPtr message) {
                std::uint16_t squelched = 0;
                link.send(
                    message,
                    [&](PublicKey const& key,
                        PeerWPtr const& peerPtr,
                        std::uint32_t duration) {
                        squelched++;
                        sendSquelch(key, peerPtr, duration);
                    });
                if (squelched)
                {
                    BEAST_EXPECT(
                        squelched ==
                        MAX_PEERS -
                            env_.app()
                                .config()
                                .VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS);
                    n++;
                }
            },
            1,
            reduce_relay::MAX_MESSAGE_THRESHOLD + 2,
            purge,
            resetClock);
        auto selected = network_.overlay().getSelected(network_.validator(0));
        BEAST_EXPECT(
            selected.size() ==
            env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS);
        BEAST_EXPECT(n == 1);  // only one selection round
        auto res = checkCounting(network_.validator(0), false);
        BEAST_EXPECT(res);
        return n == 1 && res;
    }

    /** Send fewer message so that squelch event is not generated */
    bool
    propagateNoSquelch(
        bool log,
        std::uint16_t nMessages,
        bool countingState,
        bool purge = true,
        bool resetClock = true)
    {
        bool squelched = false;
        network_.propagate(
            [&](Link& link, MessageSPtr message) {
                link.send(
                    message,
                    [&](PublicKey const& key,
                        PeerWPtr const& peerPtr,
                        std::uint32_t duration) {
                        squelched = true;
                        BEAST_EXPECT(false);
                    });
            },
            1,
            nMessages,
            purge,
            resetClock);
        auto res = checkCounting(network_.validator(0), countingState);
        return !squelched && res;
    }

    /** Receiving a message from new peer should change the
     * slot's state to Counting.
     */
    void
    testNewPeer(bool log)
    {
        doTest("New Peer", log, [this](bool log) {
            BEAST_EXPECT(propagateAndSquelch(log, true, false));
            network_.addPeer();
            BEAST_EXPECT(propagateNoSquelch(log, 1, true, false, false));
        });
    }

    /** Selected peer disconnects. Should change the state to counting and
     * unsquelch squelched peers. */
    void
    testSelectedPeerDisconnects(bool log)
    {
        doTest("Selected Peer Disconnects", log, [this](bool log) {
            ManualClock::advance(seconds(601));
            BEAST_EXPECT(propagateAndSquelch(log, true, false));
            auto id = network_.overlay().getSelectedPeer(network_.validator(0));
            std::uint16_t unsquelched = 0;
            network_.overlay().deletePeer(
                id, [&](PublicKey const& key, PeerWPtr const& peer) {
                    unsquelched++;
                });
            BEAST_EXPECT(
                unsquelched ==
                MAX_PEERS -
                    env_.app()
                        .config()
                        .VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS);
            BEAST_EXPECT(checkCounting(network_.validator(0), true));
        });
    }

    /** Selected peer stops relaying. Should change the state to counting and
     * unsquelch squelched peers. */
    void
    testSelectedPeerStopsRelaying(bool log)
    {
        doTest("Selected Peer Stops Relaying", log, [this](bool log) {
            ManualClock::advance(seconds(601));
            BEAST_EXPECT(propagateAndSquelch(log, true, false));
            ManualClock::advance(reduce_relay::IDLED + seconds(1));
            std::uint16_t unsquelched = 0;
            network_.overlay().deleteIdlePeers(
                [&](PublicKey const& key, PeerWPtr const& peer) {
                    unsquelched++;
                });
            auto peers = network_.overlay().getPeers(network_.validator(0));
            BEAST_EXPECT(
                unsquelched ==
                MAX_PEERS -
                    env_.app()
                        .config()
                        .VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS);
            BEAST_EXPECT(checkCounting(network_.validator(0), true));
        });
    }

    /** Squelched peer disconnects. Should not change the state to counting.
     */
    void
    testSquelchedPeerDisconnects(bool log)
    {
        doTest("Squelched Peer Disconnects", log, [this](bool log) {
            ManualClock::advance(seconds(601));
            BEAST_EXPECT(propagateAndSquelch(log, true, false));
            auto peers = network_.overlay().getPeers(network_.validator(0));
            auto it = std::find_if(peers.begin(), peers.end(), [&](auto it) {
                return std::get<reduce_relay::PeerState>(it.second) ==
                    reduce_relay::PeerState::Squelched;
            });
            assert(it != peers.end());
            std::uint16_t unsquelched = 0;
            network_.overlay().deletePeer(
                it->first, [&](PublicKey const& key, PeerWPtr const& peer) {
                    unsquelched++;
                });
            BEAST_EXPECT(unsquelched == 0);
            BEAST_EXPECT(checkCounting(network_.validator(0), false));
        });
    }

    void
    testConfig(bool log)
    {
        doTest("Test Config - squelch enabled (legacy)", log, [&](bool log) {
            Config c;

            std::string toLoad(R"rippleConfig(
[reduce_relay]
vp_enable=1
)rippleConfig");

            c.loadFromString(toLoad);
            BEAST_EXPECT(c.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE == true);
        });

        doTest("Test Config - squelch disabled (legacy)", log, [&](bool log) {
            Config c;

            std::string toLoad(R"rippleConfig(
[reduce_relay]
vp_enable=0
)rippleConfig");

            c.loadFromString(toLoad);
            BEAST_EXPECT(c.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE == false);

            Config c1;

            toLoad = R"rippleConfig(
[reduce_relay]
)rippleConfig";

            c1.loadFromString(toLoad);
            BEAST_EXPECT(c1.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE == false);
        });

        doTest("Test Config - squelch enabled", log, [&](bool log) {
            Config c;

            std::string toLoad(R"rippleConfig(
[reduce_relay]
vp_base_squelch_enable=1
)rippleConfig");

            c.loadFromString(toLoad);
            BEAST_EXPECT(c.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE == true);
        });

        doTest("Test Config - squelch disabled", log, [&](bool log) {
            Config c;

            std::string toLoad(R"rippleConfig(
[reduce_relay]
vp_base_squelch_enable=0
)rippleConfig");

            c.loadFromString(toLoad);
            BEAST_EXPECT(c.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE == false);
        });

        doTest("Test Config - legacy and new", log, [&](bool log) {
            Config c;

            std::string toLoad(R"rippleConfig(
[reduce_relay]
vp_base_squelch_enable=0
vp_enable=0
)rippleConfig");

            std::string error;
            auto const expectedError =
                "Invalid reduce_relay"
                " cannot specify both vp_base_squelch_enable and vp_enable "
                "options. "
                "vp_enable was deprecated and replaced by "
                "vp_base_squelch_enable";

            try
            {
                c.loadFromString(toLoad);
            }
            catch (std::runtime_error& e)
            {
                error = e.what();
            }

            BEAST_EXPECT(error == expectedError);
        });

        doTest("Test Config - max selected peers", log, [&](bool log) {
            Config c;

            std::string toLoad(R"rippleConfig(
[reduce_relay]
)rippleConfig");

            c.loadFromString(toLoad);
            BEAST_EXPECT(c.VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS == 5);

            Config c1;

            toLoad = R"rippleConfig(
[reduce_relay]
vp_base_squelch_max_selected_peers=6
)rippleConfig";

            c1.loadFromString(toLoad);
            BEAST_EXPECT(c1.VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS == 6);

            Config c2;

            toLoad = R"rippleConfig(
[reduce_relay]
vp_base_squelch_max_selected_peers=2
)rippleConfig";

            std::string error;
            auto const expectedError =
                "Invalid reduce_relay"
                " vp_base_squelch_max_selected_peers must be "
                "greater than or equal to 3";
            try
            {
                c2.loadFromString(toLoad);
            }
            catch (std::runtime_error& e)
            {
                error = e.what();
            }

            BEAST_EXPECT(error == expectedError);
        });
    }

    void
    testBaseSquelchReady(bool log)
    {
        doTest("BaseSquelchReady", log, [&](bool log) {
            ManualClock::reset();
            auto createSlots = [&](bool baseSquelchEnabled)
                -> reduce_relay::Slots<ManualClock> {
                env_.app().config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE =
                    baseSquelchEnabled;
                return reduce_relay::Slots<ManualClock>(
                    env_.app().logs(), network_.overlay(), env_.app().config());
            };
            // base squelching must not be ready if squelching is disabled
            BEAST_EXPECT(!createSlots(false).baseSquelchReady());

            // base squelch must not be ready as not enough time passed from
            // bootup
            BEAST_EXPECT(!createSlots(true).baseSquelchReady());

            ManualClock::advance(reduce_relay::WAIT_ON_BOOTUP + minutes{1});

            // base squelch enabled and bootup time passed
            BEAST_EXPECT(createSlots(true).baseSquelchReady());

            // even if time passed, base squelching must not be ready if turned
            // off in the config
            BEAST_EXPECT(!createSlots(false).baseSquelchReady());
        });
    }

    void
    testInternalHashRouter(bool log)
    {
        doTest("Duplicate Message", log, [&](bool log) {
            network_.reset();
            // update message count for the same peer/validator
            std::int16_t nMessages = 5;
            for (int i = 0; i < nMessages; i++)
            {
                uint256 key(i);
                network_.overlay().updateSlotAndSquelch(
                    key,
                    network_.validator(0),
                    0,
                    [&](PublicKey const&, PeerWPtr, std::uint32_t) {});
            }
            auto peers = network_.overlay().getPeers(network_.validator(0));
            // first message changes Slot state to Counting and is not counted,
            // hence '-1'.
            BEAST_EXPECT(std::get<1>(peers[0]) == (nMessages - 1));
            // add duplicate
            uint256 key(nMessages - 1);
            network_.overlay().updateSlotAndSquelch(
                key,
                network_.validator(0),
                0,
                [&](PublicKey const&, PeerWPtr, std::uint32_t) {});
            // confirm the same number of messages
            peers = network_.overlay().getPeers(network_.validator(0));
            BEAST_EXPECT(std::get<1>(peers[0]) == (nMessages - 1));
            // advance the clock
            ManualClock::advance(reduce_relay::IDLED + seconds(1));
            network_.overlay().updateSlotAndSquelch(
                key,
                network_.validator(0),
                0,
                [&](PublicKey const&, PeerWPtr, std::uint32_t) {});
            peers = network_.overlay().getPeers(network_.validator(0));
            // confirm message number increased
            BEAST_EXPECT(std::get<1>(peers[0]) == nMessages);
        });
    }

    struct Handler : public reduce_relay::SquelchHandler
    {
        Handler() : maxDuration_(0)
        {
        }
        void
        squelch(PublicKey const&, Peer::id_t, std::uint32_t duration)
            const override
        {
            if (duration > maxDuration_)
                maxDuration_ = duration;
        }
        void
        unsquelch(PublicKey const&, Peer::id_t) const override
        {
        }
        mutable int maxDuration_;
    };

    void
    testRandomSquelch(bool l)
    {
        doTest("Random Squelch", l, [&](bool l) {
            PublicKey validator = std::get<0>(randomKeyPair(KeyType::ed25519));
            Handler handler;

            auto run = [&](int npeers) {
                handler.maxDuration_ = 0;
                reduce_relay::Slots<ManualClock> slots(
                    env_.app().logs(), handler, env_.app().config());
                // 1st message from a new peer switches the slot
                // to counting state and resets the counts of all peers +
                // MAX_MESSAGE_THRESHOLD + 1 messages to reach the threshold
                // and switch the slot's state to peer selection.
                for (int m = 1; m <= reduce_relay::MAX_MESSAGE_THRESHOLD + 2;
                     m++)
                {
                    for (int peer = 0; peer < npeers; peer++)
                    {
                        // make unique message hash to make the
                        // slot's internal hash router accept the message
                        std::uint64_t mid = m * 1000 + peer;
                        uint256 const message{mid};
                        slots.updateSlotAndSquelch(
                            message,
                            validator,
                            peer,
                            protocol::MessageType::mtVALIDATION);
                    }
                }
                // make Slot's internal hash router expire all messages
                ManualClock::advance(hours(1));
            };

            using namespace reduce_relay;
            // expect max duration less than MAX_UNSQUELCH_EXPIRE_DEFAULT with
            // less than or equal to 60 peers
            run(20);
            BEAST_EXPECT(
                handler.maxDuration_ >= MIN_UNSQUELCH_EXPIRE.count() &&
                handler.maxDuration_ <= MAX_UNSQUELCH_EXPIRE_DEFAULT.count());
            run(60);
            BEAST_EXPECT(
                handler.maxDuration_ >= MIN_UNSQUELCH_EXPIRE.count() &&
                handler.maxDuration_ <= MAX_UNSQUELCH_EXPIRE_DEFAULT.count());
            // expect max duration greater than MIN_UNSQUELCH_EXPIRE and less
            // than MAX_UNSQUELCH_EXPIRE_PEERS with peers greater than 60
            // and less than 360
            run(350);
            // can't make this condition stronger. squelch
            // duration is probabilistic and max condition may still fail.
            // log when the value is low
            BEAST_EXPECT(
                handler.maxDuration_ >= MIN_UNSQUELCH_EXPIRE.count() &&
                handler.maxDuration_ <= MAX_UNSQUELCH_EXPIRE_PEERS.count());
            using namespace beast::unit_test::detail;
            if (handler.maxDuration_ <= MAX_UNSQUELCH_EXPIRE_DEFAULT.count())
                log << make_reason(
                           "warning: squelch duration is low",
                           __FILE__,
                           __LINE__)
                    << std::endl
                    << std::flush;
            // more than 400 is still less than MAX_UNSQUELCH_EXPIRE_PEERS
            run(400);
            BEAST_EXPECT(
                handler.maxDuration_ >= MIN_UNSQUELCH_EXPIRE.count() &&
                handler.maxDuration_ <= MAX_UNSQUELCH_EXPIRE_PEERS.count());
            if (handler.maxDuration_ <= MAX_UNSQUELCH_EXPIRE_DEFAULT.count())
                log << make_reason(
                           "warning: squelch duration is low",
                           __FILE__,
                           __LINE__)
                    << std::endl
                    << std::flush;
        });
    }

    void
    testHandshake(bool log)
    {
        doTest("Handshake", log, [&](bool log) {
            auto setEnv = [&](bool enable) {
                Config c;
                std::stringstream str;
                str << "[reduce_relay]\n"
                    << "vp_enable=" << enable << "\n"
                    << "[compression]\n"
                    << "1\n";
                c.loadFromString(str.str());
                env_.app().config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE =
                    c.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE;

                env_.app().config().COMPRESSION = c.COMPRESSION;
            };
            auto handshake = [&](int outboundEnable, int inboundEnable) {
                beast::IP::Address addr =
                    boost::asio::ip::make_address("172.1.1.100");

                setEnv(outboundEnable);
                auto request = ripple::makeRequest(
                    true,
                    env_.app().config().COMPRESSION,
                    false,
                    env_.app().config().TX_REDUCE_RELAY_ENABLE,
                    env_.app().config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE);
                http_request_type http_request;
                http_request.version(request.version());
                http_request.base() = request.base();
                // feature enabled on the peer's connection only if both sides
                // are enabled
                auto const peerEnabled = inboundEnable && outboundEnable;
                // inbound is enabled if the request's header has the feature
                // enabled and the peer's configuration is enabled
                auto const inboundEnabled = peerFeatureEnabled(
                    http_request, FEATURE_VPRR, inboundEnable);
                BEAST_EXPECT(!(peerEnabled ^ inboundEnabled));

                setEnv(inboundEnable);
                auto http_resp = ripple::makeResponse(
                    true,
                    http_request,
                    addr,
                    addr,
                    uint256{1},
                    1,
                    {1, 0},
                    env_.app());
                // outbound is enabled if the response's header has the feature
                // enabled and the peer's configuration is enabled
                auto const outboundEnabled =
                    peerFeatureEnabled(http_resp, FEATURE_VPRR, outboundEnable);
                BEAST_EXPECT(!(peerEnabled ^ outboundEnabled));
            };
            handshake(1, 1);
            handshake(1, 0);
            handshake(0, 1);
            handshake(0, 0);
        });
    }

    jtx::Env env_;
    Network network_;

public:
    reduce_relay_test()
        : env_(*this, jtx::envconfig([](std::unique_ptr<Config> cfg) {
            cfg->VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE = true;
            cfg->VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS = 6;
            return cfg;
        }))
        , network_(env_.app())
    {
    }

    void
    run() override
    {
        bool log = false;
        testConfig(log);
        testInitialRound(log);
        testPeerUnsquelchedTooSoon(log);
        testPeerUnsquelched(log);
        testNewPeer(log);
        testSquelchedPeerDisconnects(log);
        testSelectedPeerDisconnects(log);
        testSelectedPeerStopsRelaying(log);
        testInternalHashRouter(log);
        testRandomSquelch(log);
        testHandshake(log);
        testBaseSquelchReady(log);
    }
};

class reduce_relay_simulate_test : public reduce_relay_test
{
    void
    testRandom(bool log)
    {
        doTest("Random Test", log, [&](bool log) { random(log); });
    }

    void
    run() override
    {
        bool log = false;
        testRandom(log);
    }
};

BEAST_DEFINE_TESTSUITE(reduce_relay, overlay, ripple);
BEAST_DEFINE_TESTSUITE_MANUAL(reduce_relay_simulate, overlay, ripple);

}  // namespace test

}  // namespace ripple
